Dask Local Cluster - Larger than memory computation

In the ODC and Dask (LocalCluster) notebook we saw how dask can be used to speed up IO and computation by parallelising operations into chunks and tasks, and using delayed tasks and task graph optimization to remove redundant tasks when results are not used.

Using chunks provides one additional capability beyond parallelisation - the ability to perform computations that are larger than available memory.

Since dask operations are performed on chunks it is possible for dask to perform operations on smaller pieces that each fit into memory. This is particularly useful if you have a large amount of data that is being reduced, say by performing a seasonal mean.

As with parallelisation, not all algorithms are amenable to being broken into smaller pieces so this won't always be possible. Dask arrays though go a long way to make this easier for a great many operations.

Firstly, some initial imports...

In [2]:
# EASI tools
import git
import sys, os
os.environ['USE_PYGEOS'] = '0'
repo = git.Repo('.', search_parent_directories=True).working_tree_dir
if repo not in sys.path: sys.path.append(repo)
from easi_tools import EasiDefaults, notebook_utils
easi = EasiDefaults()
Successfully found configuration for deployment "chile"

We'll continue using the same algorithm as before but this time we're going to modify it's memory usage to exceed the LocalCluster's available memory. This example notebook is setup to run on a compute node with 28 GiB of available memory and 8 cores for the LocalCluster. We'll make that explicit here in case you are blessed with a larger number of resources.

Let's start the cluster...

In [3]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=2, threads_per_worker=4)
cluster.scale(n=2, memory="14GiB")
client = Client(cluster)
client
/env/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 34023 instead
  warnings.warn(
Out[3]:

Client

Client-44f6695b-fa48-11ed-97f2-1eb1b782f397

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:34023/status

Cluster Info

LocalCluster

00ad2e07

Dashboard: http://127.0.0.1:34023/status Workers: 2
Total threads: 8 Total memory: 29.00 GiB
Status: running Using processes: True

Scheduler Info

Scheduler

Scheduler-58ee65ae-5cb3-469c-b7ab-5b0a630d9c0d

Comm: tcp://127.0.0.1:41199 Workers: 2
Dashboard: http://127.0.0.1:34023/status Total threads: 8
Started: Just now Total memory: 29.00 GiB

Workers

Worker: 0

Comm: tcp://127.0.0.1:36641 Total threads: 4
Dashboard: http://127.0.0.1:42899/status Memory: 14.50 GiB
Nanny: tcp://127.0.0.1:42135
Local directory: /tmp/dask-worker-space/worker-617vkvrl

Worker: 1

Comm: tcp://127.0.0.1:34969 Total threads: 4
Dashboard: http://127.0.0.1:40065/status Memory: 14.50 GiB
Nanny: tcp://127.0.0.1:41973
Local directory: /tmp/dask-worker-space/worker-pr7mt680

We can monitor memory usage on the workers using the dask dashboard URL below and the Status tab. The workers are local so this will be memory on the same compute node that Jupyter is running in.

In [4]:
dashboard_address = notebook_utils.localcluster_dashboard(client=client,server=easi.hub)
print(dashboard_address)
https://hub.datacubechile.cl/user/jhodge/proxy/34023/status

As we will be using Requester Pays buckets in AWS S3, we need to run the configure_s3_access() function below with the client option to ensure that Jupyter and the cluster have the correct permissions to be able to access the data.

In [5]:
from datacube.utils.aws import configure_s3_access
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
In [6]:
import datacube
from datacube.utils import masking

dc = datacube.Datacube()
In [7]:
# Get the centroid of the coordinates in the default configuration
central_lat = sum(easi.latitude)/2
central_lon = sum(easi.longitude)/2

# or set your own by uncommenting and editing the following lines
# central_lat = -42.019
# central_lon = 146.615

# Set the buffer to load around the central coordinates
# This is a radial distance for the bbox to actual area so bbox 2x buffer in both dimensions
buffer = 0.05

# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)

# Data products - Landsat 8 ARD from Geoscience Australia
products = easi.product('landsat')

# Set the date range to load data over 
set_time = ("2021-01-01", "2021-12-31")

# Set the measurements/bands to load. None eill load all of them
measurements = None

# Set the coordinate reference system and output resolution
set_crs = easi.crs('landsat')  # If defined, else None
set_resolution = easi.resolution('landsat')  # If defined, else None
# set_crs = "epsg:3577"
# set_resolution = (-30, 30)

group_by = "solar_day"
In [8]:
dataset = None # clear results from any previous runs
dataset = dc.load(
            product=products,
            x=study_area_lon,
            y=study_area_lat,
            time=set_time,
            measurements=measurements,
            resampling={"fmask": "nearest", "*": "average"},
            output_crs=set_crs,
            resolution=set_resolution,
            dask_chunks =  {"time":1},
            group_by=group_by,
        )
dataset
Out[8]:
<xarray.Dataset>
Dimensions:      (time: 22, y: 381, x: 335)
Coordinates:
  * time         (time) datetime64[ns] 2021-01-03T14:39:19.317361 ... 2021-12...
  * y            (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
  * x            (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
    spatial_ref  int32 32718
Data variables:
    coastal      (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
    blue         (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
    green        (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
    red          (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
    nir08        (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
    swir16       (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
    swir22       (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
    qa_pixel     (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
    qa_aerosol   (time, y, x) uint8 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
    qa_radsat    (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
Attributes:
    crs:           epsg:32718
    grid_mapping:  spatial_ref
xarray.Dataset
    • time: 22
    • y: 381
    • x: 335
    • time
      (time)
      datetime64[ns]
      2021-01-03T14:39:19.317361 ... 2...
      units :
      seconds since 1970-01-01 00:00:00
      array(['2021-01-03T14:39:19.317361000', '2021-01-19T14:39:11.996195000',
             '2021-02-04T14:39:10.642566000', '2021-02-20T14:39:06.231918000',
             '2021-03-08T14:38:58.547361000', '2021-03-24T14:38:51.737798000',
             '2021-04-09T14:38:47.161063000', '2021-04-25T14:38:39.521240000',
             '2021-05-11T14:38:36.106063000', '2021-05-27T14:38:45.945207000',
             '2021-06-12T14:38:52.799593000', '2021-06-28T14:38:56.818329000',
             '2021-07-14T14:38:58.020893000', '2021-07-30T14:39:06.209510000',
             '2021-08-31T14:39:16.967976000', '2021-09-16T14:39:20.745598000',
             '2021-10-02T14:39:25.506402000', '2021-10-18T14:39:29.217991000',
             '2021-11-03T14:39:28.562838000', '2021-11-19T14:39:23.470160000',
             '2021-12-05T14:39:25.260483000', '2021-12-21T14:39:22.530622000'],
            dtype='datetime64[ns]')
    • y
      (y)
      float64
      6.692e+06 6.692e+06 ... 6.681e+06
      units :
      metre
      resolution :
      -30.0
      crs :
      epsg:32718
      array([6692085., 6692055., 6692025., ..., 6680745., 6680715., 6680685.])
    • x
      (x)
      float64
      8.572e+05 8.572e+05 ... 8.672e+05
      units :
      metre
      resolution :
      30.0
      crs :
      epsg:32718
      array([857175., 857205., 857235., ..., 867135., 867165., 867195.])
    • spatial_ref
      ()
      int32
      32718
      spatial_ref :
      PROJCS["WGS 84 / UTM zone 18S",GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],AUTHORITY["EPSG","4326"]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",-75],PARAMETER["scale_factor",0.9996],PARAMETER["false_easting",500000],PARAMETER["false_northing",10000000],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH],AUTHORITY["EPSG","32718"]]
      grid_mapping_name :
      transverse_mercator
      array(32718, dtype=int32)
    • coastal
      (time, y, x)
      uint16
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 5.36 MiB 249.29 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint16 numpy.ndarray
      335 381 22
    • blue
      (time, y, x)
      uint16
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 5.36 MiB 249.29 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint16 numpy.ndarray
      335 381 22
    • green
      (time, y, x)
      uint16
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 5.36 MiB 249.29 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint16 numpy.ndarray
      335 381 22
    • red
      (time, y, x)
      uint16
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 5.36 MiB 249.29 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint16 numpy.ndarray
      335 381 22
    • nir08
      (time, y, x)
      uint16
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 5.36 MiB 249.29 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint16 numpy.ndarray
      335 381 22
    • swir16
      (time, y, x)
      uint16
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 5.36 MiB 249.29 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint16 numpy.ndarray
      335 381 22
    • swir22
      (time, y, x)
      uint16
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 5.36 MiB 249.29 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint16 numpy.ndarray
      335 381 22
    • qa_pixel
      (time, y, x)
      uint16
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      bit_index
      nodata :
      1
      flags_definition :
      {'snow': {'bits': 5, 'values': {'0': 'not_high_confidence', '1': 'high_confidence'}}, 'clear': {'bits': 6, 'values': {'0': 'not_clear', '1': 'clear'}}, 'cloud': {'bits': 3, 'values': {'0': 'not_high_confidence', '1': 'high_confidence'}}, 'water': {'bits': 7, 'values': {'0': 'land_or_cloud', '1': 'water'}}, 'cirrus': {'bits': 2, 'values': {'0': 'not_high_confidence', '1': 'high_confidence'}}, 'nodata': {'bits': 0, 'values': {'0': False, '1': True}}, 'qa_pixel': {'bits': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], 'values': {'1': 'Fill', '2': 'Dilated Cloud', '4': 'Cirrus', '8': 'Cloud', '16': 'Cloud Shadow', '32': 'Snow', '64': 'Clear', '128': 'Water', '256': 'Cloud Confidence low bit', '512': 'Cloud Confidence high bit', '1024': 'Cloud Shadow Confidence low bit', '2048': 'Cloud Shadow Confidence high bit', '4096': 'Snow Ice Confidence low bit', '8192': 'Snow Ice Confidence high bit', '16384': 'Cirrus Confidence low bit', '32768': 'Cirrus Confidence high bit'}, 'description': 'Level 2 pixel quality'}, 'cloud_shadow': {'bits': 4, 'values': {'0': 'not_high_confidence', '1': 'high_confidence'}}, 'dilated_cloud': {'bits': 1, 'values': {'0': 'not_dilated', '1': 'dilated'}}, 'cloud_confidence': {'bits': [8, 9], 'values': {'0': 'none', '1': 'low', '2': 'medium', '3': 'high'}}, 'cirrus_confidence': {'bits': [14, 15], 'values': {'0': 'none', '1': 'low', '2': 'reserved', '3': 'high'}}, 'snow_ice_confidence': {'bits': [12, 13], 'values': {'0': 'none', '1': 'low', '2': 'reserved', '3': 'high'}}, 'cloud_shadow_confidence': {'bits': [10, 11], 'values': {'0': 'none', '1': 'low', '2': 'reserved', '3': 'high'}}}
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 5.36 MiB 249.29 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint16 numpy.ndarray
      335 381 22
    • qa_aerosol
      (time, y, x)
      uint8
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      bit_index
      nodata :
      1
      flags_definition :
      {'water': {'bits': 2, 'values': {'0': 'not_water', '1': 'water'}}, 'nodata': {'bits': 0, 'values': {'0': False, '1': True}}, 'qa_aerosol': {'bits': [0, 1, 2, 3, 4, 5, 6, 7], 'values': {'1': 'Fill', '2': 'Valid aerosol retrieval', '4': 'Water', '8': 'Unused', '16': 'Unused', '32': 'Interpolated Aerosol', '64': 'Aerosol Level low bit', '128': 'Aerosol Level high bit'}, 'description': 'Aerosol quality assessment'}, 'aerosol_level': {'bits': [6, 7], 'values': {'0': 'climatology', '1': 'low', '2': 'medium', '3': 'high'}}, 'valid_retrieval': {'bits': 1, 'values': {'0': 'not_valid', '1': 'valid'}}, 'interp_retrieval': {'bits': 5, 'values': {'0': 'not_aerosol_interpolated', '1': 'aerosol_interpolated'}}}
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 2.68 MiB 124.64 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint8 numpy.ndarray
      335 381 22
    • qa_radsat
      (time, y, x)
      uint16
      dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
      units :
      bit_index
      nodata :
      1
      flags_definition :
      {'qa_radsat': {'bits': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 'values': {'1': 'Band 1 Data Saturation', '2': 'Band 2 Data Saturation', '4': 'Band 3 Data Saturation', '8': 'Band 4 Data Saturation', '16': 'Band 5 Data Saturation', '32': 'Band 6 Data Saturation', '64': 'Band 7 Data Saturation', '128': 'Unused', '256': 'Band 9 Data Saturation', '512': 'Unused', '1024': 'Unused', '2048': 'Terrain occlusion'}, 'description': 'Radiometric saturation'}, 'b1_saturation': {'bits': 0, 'values': {'0': 'no_saturation', '1': 'saturated_data'}}, 'b2_saturation': {'bits': 1, 'values': {'0': 'no_saturation', '1': 'saturated_data'}}, 'b3_saturation': {'bits': 2, 'values': {'0': 'no_saturation', '1': 'saturated_data'}}, 'b4_saturation': {'bits': 3, 'values': {'0': 'no_saturation', '1': 'saturated_data'}}, 'b5_saturation': {'bits': 4, 'values': {'0': 'no_saturation', '1': 'saturated_data'}}, 'b6_saturation': {'bits': 5, 'values': {'0': 'no_saturation', '1': 'saturated_data'}}, 'b7_saturation': {'bits': 6, 'values': {'0': 'no_saturation', '1': 'saturated_data'}}, 'b9_saturation': {'bits': 8, 'values': {'0': 'no_saturation', '1': 'saturated_data'}}, 'terrain_occlusion': {'bits': 11, 'values': {'0': 'no_terrain_occlusion', '1': 'terrain_occlusion'}}}
      crs :
      epsg:32718
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 5.36 MiB 249.29 kiB
      Shape (22, 381, 335) (1, 381, 335)
      Dask graph 22 chunks in 1 graph layer
      Data type uint16 numpy.ndarray
      335 381 22
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2021-01-03 14:39:19.317361', '2021-01-19 14:39:11.996195',
                     '2021-02-04 14:39:10.642566', '2021-02-20 14:39:06.231918',
                     '2021-03-08 14:38:58.547361', '2021-03-24 14:38:51.737798',
                     '2021-04-09 14:38:47.161063', '2021-04-25 14:38:39.521240',
                     '2021-05-11 14:38:36.106063', '2021-05-27 14:38:45.945207',
                     '2021-06-12 14:38:52.799593', '2021-06-28 14:38:56.818329',
                     '2021-07-14 14:38:58.020893', '2021-07-30 14:39:06.209510',
                     '2021-08-31 14:39:16.967976', '2021-09-16 14:39:20.745598',
                     '2021-10-02 14:39:25.506402', '2021-10-18 14:39:29.217991',
                     '2021-11-03 14:39:28.562838', '2021-11-19 14:39:23.470160',
                     '2021-12-05 14:39:25.260483', '2021-12-21 14:39:22.530622'],
                    dtype='datetime64[ns]', name='time', freq=None))
    • y
      PandasIndex
      PandasIndex(Float64Index([6692085.0, 6692055.0, 6692025.0, 6691995.0, 6691965.0, 6691935.0,
                    6691905.0, 6691875.0, 6691845.0, 6691815.0,
                    ...
                    6680955.0, 6680925.0, 6680895.0, 6680865.0, 6680835.0, 6680805.0,
                    6680775.0, 6680745.0, 6680715.0, 6680685.0],
                   dtype='float64', name='y', length=381))
    • x
      PandasIndex
      PandasIndex(Float64Index([857175.0, 857205.0, 857235.0, 857265.0, 857295.0, 857325.0,
                    857355.0, 857385.0, 857415.0, 857445.0,
                    ...
                    866925.0, 866955.0, 866985.0, 867015.0, 867045.0, 867075.0,
                    867105.0, 867135.0, 867165.0, 867195.0],
                   dtype='float64', name='x', length=335))
  • crs :
    epsg:32718
    grid_mapping :
    spatial_ref

We can check the total size of the dataset using nbytes. We'll divide by 2**30 to have the result display in gibibytes.

In [9]:
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
dataset size (GiB) 0.05

As you can see this Region of Interest (ROI) and spatial range (1 year) is tiny, let's scale up by increasing our ROI by increasing the buffer

In [10]:
buffer = 1

# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)
In [11]:
dataset = None # clear results from any previous runs
dataset = dc.load(
            product=products,
            x=study_area_lon,
            y=study_area_lat,
            time=set_time,
            measurements=measurements,
            resampling={"qa_pixel": "nearest", "*": "average"},
            output_crs=set_crs,
            resolution=set_resolution,
            dask_chunks =  {"time":1},
            group_by=group_by,
        )
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
dataset size (GiB) 40.49

Okay, this should now be larger than the available memory that our Jupyter node has available (which you should be able to see at the bottom of your window - probably 28.00 or 29.00 GB). This creates issues for calculation. We need to have a solution that lets us calculate the information that we want without the machine running out of memory.

Dask can compute many tasks and handle large amounts of data over the course of a series of calculations. Collectively, these calculations might work on more data in total than can fit in RAM, but it is a problem if the final product is too big to fit in RAM. Below we will change the dataset so that the final result can fit in RAM and then use the .compute() function to run all the calculations.

Let's take a look at the memory usage for one of the bands, we'll use red.

In [12]:
dataset.red
Out[12]:
<xarray.DataArray 'red' (time: 45, y: 7607, x: 6685)>
dask.array<dc_load_red, shape=(45, 7607, 6685), dtype=uint16, chunksize=(1, 7607, 6685), chunktype=numpy.ndarray>
Coordinates:
  * time         (time) datetime64[ns] 2021-01-03T14:38:55.375489 ... 2021-12...
  * y            (y) float64 6.8e+06 6.8e+06 6.8e+06 ... 6.572e+06 6.572e+06
  * x            (x) float64 7.629e+05 7.629e+05 ... 9.633e+05 9.634e+05
    spatial_ref  int32 32718
Attributes:
    units:         reflectance
    nodata:        0
    crs:           epsg:32718
    grid_mapping:  spatial_ref
xarray.DataArray
'red'
  • time: 45
  • y: 7607
  • x: 6685
  • dask.array<chunksize=(1, 7607, 6685), meta=np.ndarray>
    Array Chunk
    Bytes 4.26 GiB 96.99 MiB
    Shape (45, 7607, 6685) (1, 7607, 6685)
    Dask graph 45 chunks in 1 graph layer
    Data type uint16 numpy.ndarray
    6685 7607 45
    • time
      (time)
      datetime64[ns]
      2021-01-03T14:38:55.375489 ... 2...
      units :
      seconds since 1970-01-01 00:00:00
      array(['2021-01-03T14:38:55.375489000', '2021-01-12T14:32:40.524369000',
             '2021-01-19T14:38:48.054324000', '2021-01-28T14:32:36.904937000',
             '2021-02-04T14:38:46.692222000', '2021-02-13T14:32:33.777333000',
             '2021-02-20T14:38:42.281574000', '2021-03-01T14:32:27.583468000',
             '2021-03-08T14:38:34.597018000', '2021-03-17T14:32:18.061595000',
             '2021-03-24T14:38:27.787456000', '2021-04-02T14:32:14.760187000',
             '2021-04-09T14:38:23.210719000', '2021-04-18T14:32:08.535370000',
             '2021-04-25T14:38:15.566661000', '2021-05-04T14:31:58.900929000',
             '2021-05-11T14:38:12.151484000', '2021-05-20T14:32:07.248912000',
             '2021-05-27T14:38:21.994864000', '2021-06-05T14:32:15.352087000',
             '2021-06-12T14:38:28.853485000', '2021-06-21T14:32:20.619610000',
             '2021-06-28T14:38:32.863750000', '2021-07-07T14:32:23.063352000',
             '2021-07-14T14:38:34.074785000', '2021-07-23T14:32:27.891799000',
             '2021-07-30T14:38:42.254931000', '2021-08-08T14:32:35.246629000',
             '2021-08-24T14:32:39.983730000', '2021-08-31T14:38:53.021870000',
             '2021-09-09T14:32:44.752377000', '2021-09-16T14:38:56.791019000',
             '2021-09-25T14:32:47.793353000', '2021-10-02T14:39:01.556057000',
             '2021-10-11T14:32:53.372215000', '2021-10-18T14:39:05.271883000',
             '2021-10-27T14:32:54.682166000', '2021-11-03T14:39:04.612494000',
             '2021-11-12T14:32:50.796253000', '2021-11-19T14:38:59.524053000',
             '2021-11-28T14:32:50.278052000', '2021-12-05T14:39:01.305904000',
             '2021-12-14T14:32:49.484309000', '2021-12-21T14:38:58.588750000',
             '2021-12-30T14:32:43.480487000'], dtype='datetime64[ns]')
    • y
      (y)
      float64
      6.8e+06 6.8e+06 ... 6.572e+06
      units :
      metre
      resolution :
      -30.0
      crs :
      epsg:32718
      array([6799995., 6799965., 6799935., ..., 6571875., 6571845., 6571815.])
    • x
      (x)
      float64
      7.629e+05 7.629e+05 ... 9.634e+05
      units :
      metre
      resolution :
      30.0
      crs :
      epsg:32718
      array([762855., 762885., 762915., ..., 963315., 963345., 963375.])
    • spatial_ref
      ()
      int32
      32718
      spatial_ref :
      PROJCS["WGS 84 / UTM zone 18S",GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],AUTHORITY["EPSG","4326"]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",-75],PARAMETER["scale_factor",0.9996],PARAMETER["false_easting",500000],PARAMETER["false_northing",10000000],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH],AUTHORITY["EPSG","32718"]]
      grid_mapping_name :
      transverse_mercator
      array(32718, dtype=int32)
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2021-01-03 14:38:55.375489', '2021-01-12 14:32:40.524369',
                     '2021-01-19 14:38:48.054324', '2021-01-28 14:32:36.904937',
                     '2021-02-04 14:38:46.692222', '2021-02-13 14:32:33.777333',
                     '2021-02-20 14:38:42.281574', '2021-03-01 14:32:27.583468',
                     '2021-03-08 14:38:34.597018', '2021-03-17 14:32:18.061595',
                     '2021-03-24 14:38:27.787456', '2021-04-02 14:32:14.760187',
                     '2021-04-09 14:38:23.210719', '2021-04-18 14:32:08.535370',
                     '2021-04-25 14:38:15.566661', '2021-05-04 14:31:58.900929',
                     '2021-05-11 14:38:12.151484', '2021-05-20 14:32:07.248912',
                     '2021-05-27 14:38:21.994864', '2021-06-05 14:32:15.352087',
                     '2021-06-12 14:38:28.853485', '2021-06-21 14:32:20.619610',
                     '2021-06-28 14:38:32.863750', '2021-07-07 14:32:23.063352',
                     '2021-07-14 14:38:34.074785', '2021-07-23 14:32:27.891799',
                     '2021-07-30 14:38:42.254931', '2021-08-08 14:32:35.246629',
                     '2021-08-24 14:32:39.983730', '2021-08-31 14:38:53.021870',
                     '2021-09-09 14:32:44.752377', '2021-09-16 14:38:56.791019',
                     '2021-09-25 14:32:47.793353', '2021-10-02 14:39:01.556057',
                     '2021-10-11 14:32:53.372215', '2021-10-18 14:39:05.271883',
                     '2021-10-27 14:32:54.682166', '2021-11-03 14:39:04.612494',
                     '2021-11-12 14:32:50.796253', '2021-11-19 14:38:59.524053',
                     '2021-11-28 14:32:50.278052', '2021-12-05 14:39:01.305904',
                     '2021-12-14 14:32:49.484309', '2021-12-21 14:38:58.588750',
                     '2021-12-30 14:32:43.480487'],
                    dtype='datetime64[ns]', name='time', freq=None))
    • y
      PandasIndex
      PandasIndex(Float64Index([6799995.0, 6799965.0, 6799935.0, 6799905.0, 6799875.0, 6799845.0,
                    6799815.0, 6799785.0, 6799755.0, 6799725.0,
                    ...
                    6572085.0, 6572055.0, 6572025.0, 6571995.0, 6571965.0, 6571935.0,
                    6571905.0, 6571875.0, 6571845.0, 6571815.0],
                   dtype='float64', name='y', length=7607))
    • x
      PandasIndex
      PandasIndex(Float64Index([762855.0, 762885.0, 762915.0, 762945.0, 762975.0, 763005.0,
                    763035.0, 763065.0, 763095.0, 763125.0,
                    ...
                    963105.0, 963135.0, 963165.0, 963195.0, 963225.0, 963255.0,
                    963285.0, 963315.0, 963345.0, 963375.0],
                   dtype='float64', name='x', length=6685))
  • units :
    reflectance
    nodata :
    0
    crs :
    epsg:32718
    grid_mapping :
    spatial_ref

You can see the year now has more time observations than in the first dataset because we've expanded the area of interest and picked up multiple satellite passes. The spatial dimensions are also much larger.

Take a note of the Chunk Bytes - probably around 80 MiB. This is the smallest unit of this dataset that dask will do work on. To do an NDVI calculation, dask will need two bands, the mask, the result and a few other temporary variables in memory at once. This means whilst this value is an indicator of memory required on a worker to perform an operation it is not the total, which will depend on the operation.

We can adjust the amount of memory per chunk further by chunking the spatial dimension. Let's split it into 2048x2048 size pieces.

In [13]:
dataset = None # clear results from any previous runs
dataset = dc.load(
            product=products,
            x=study_area_lon,
            y=study_area_lat,
            time=set_time,
            measurements=measurements,
            resampling={"fmask": "nearest", "*": "average"},
            output_crs=set_crs,
            resolution=set_resolution,
            dask_chunks =  {"time":1, "x":2048, "y":2048},  ## Adjust the chunking spatially as well
            group_by=group_by,
        )
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
dataset size (GiB) 40.49

As you can see the total dataset size stays the same.

Look at the red data variable below. You can see the chunk size has reduced to 8 MiB, and there are now more chunks (around 700-800) - compared with around 60 previously. This will result in a higher number of Tasks for Dask to work on. This makes sense: smaller chunks, more tasks.

TIP: The relationship between tasks and chunks is a critical tuning parameter.

Workers have limits in memory and compute capacity. The Dask Scheduler has limits in how many tasks it can manage efficiently (and remember it is tracking all of the data variables, not just this one). The trick with Dask is to give it a good number of chunks of data which aren't too big and don't result in too many tasks. There is always a trade-off and each calculation will be different. Ideally, you want chunks to be aligned with how the data is stored, or how the data is going to be used. If those two things are different, then rechunking can result in large amounts of data needing to be held in the cluster memory, which could result in failures. In the same way, if chunks are too large, they might end up taking up too much memory, causing a crash. This is sometimes down to trial and error.

Later, when we move to a fully remote and distributed cluster, chunks also become an important element in communicating between workers over networks.

If you look carefully at the cube-like diagram in the summary below you will see that some internal lines showing the chunk boundaries for the spatial dimensions. 2048 wasn't an even multiplier so dask has made some chunks on the edges smaller. The specification of chunks is a guide: the actual data, numpy arrays in this case, are made into chunk sized shapes or smaller. These are called blocks in dask and represent the actual shape of the numpy array that will be processed.

Somewhat confusingly the terms blocks and chunks are also used in dask literature and you'll need to check the context to see if it is referring to the specification or the actual block of data. For the moment this differentiation doesn't matter but when performing low level custom operations knowing that your blocks might be a different shape does matter.

In [14]:
dataset.red
Out[14]:
<xarray.DataArray 'red' (time: 45, y: 7607, x: 6685)>
dask.array<dc_load_red, shape=(45, 7607, 6685), dtype=uint16, chunksize=(1, 2048, 2048), chunktype=numpy.ndarray>
Coordinates:
  * time         (time) datetime64[ns] 2021-01-03T14:38:55.375489 ... 2021-12...
  * y            (y) float64 6.8e+06 6.8e+06 6.8e+06 ... 6.572e+06 6.572e+06
  * x            (x) float64 7.629e+05 7.629e+05 ... 9.633e+05 9.634e+05
    spatial_ref  int32 32718
Attributes:
    units:         reflectance
    nodata:        0
    crs:           epsg:32718
    grid_mapping:  spatial_ref
xarray.DataArray
'red'
  • time: 45
  • y: 7607
  • x: 6685
  • dask.array<chunksize=(1, 2048, 2048), meta=np.ndarray>
    Array Chunk
    Bytes 4.26 GiB 8.00 MiB
    Shape (45, 7607, 6685) (1, 2048, 2048)
    Dask graph 720 chunks in 1 graph layer
    Data type uint16 numpy.ndarray
    6685 7607 45
    • time
      (time)
      datetime64[ns]
      2021-01-03T14:38:55.375489 ... 2...
      units :
      seconds since 1970-01-01 00:00:00
      array(['2021-01-03T14:38:55.375489000', '2021-01-12T14:32:40.524369000',
             '2021-01-19T14:38:48.054324000', '2021-01-28T14:32:36.904937000',
             '2021-02-04T14:38:46.692222000', '2021-02-13T14:32:33.777333000',
             '2021-02-20T14:38:42.281574000', '2021-03-01T14:32:27.583468000',
             '2021-03-08T14:38:34.597018000', '2021-03-17T14:32:18.061595000',
             '2021-03-24T14:38:27.787456000', '2021-04-02T14:32:14.760187000',
             '2021-04-09T14:38:23.210719000', '2021-04-18T14:32:08.535370000',
             '2021-04-25T14:38:15.566661000', '2021-05-04T14:31:58.900929000',
             '2021-05-11T14:38:12.151484000', '2021-05-20T14:32:07.248912000',
             '2021-05-27T14:38:21.994864000', '2021-06-05T14:32:15.352087000',
             '2021-06-12T14:38:28.853485000', '2021-06-21T14:32:20.619610000',
             '2021-06-28T14:38:32.863750000', '2021-07-07T14:32:23.063352000',
             '2021-07-14T14:38:34.074785000', '2021-07-23T14:32:27.891799000',
             '2021-07-30T14:38:42.254931000', '2021-08-08T14:32:35.246629000',
             '2021-08-24T14:32:39.983730000', '2021-08-31T14:38:53.021870000',
             '2021-09-09T14:32:44.752377000', '2021-09-16T14:38:56.791019000',
             '2021-09-25T14:32:47.793353000', '2021-10-02T14:39:01.556057000',
             '2021-10-11T14:32:53.372215000', '2021-10-18T14:39:05.271883000',
             '2021-10-27T14:32:54.682166000', '2021-11-03T14:39:04.612494000',
             '2021-11-12T14:32:50.796253000', '2021-11-19T14:38:59.524053000',
             '2021-11-28T14:32:50.278052000', '2021-12-05T14:39:01.305904000',
             '2021-12-14T14:32:49.484309000', '2021-12-21T14:38:58.588750000',
             '2021-12-30T14:32:43.480487000'], dtype='datetime64[ns]')
    • y
      (y)
      float64
      6.8e+06 6.8e+06 ... 6.572e+06
      units :
      metre
      resolution :
      -30.0
      crs :
      epsg:32718
      array([6799995., 6799965., 6799935., ..., 6571875., 6571845., 6571815.])
    • x
      (x)
      float64
      7.629e+05 7.629e+05 ... 9.634e+05
      units :
      metre
      resolution :
      30.0
      crs :
      epsg:32718
      array([762855., 762885., 762915., ..., 963315., 963345., 963375.])
    • spatial_ref
      ()
      int32
      32718
      spatial_ref :
      PROJCS["WGS 84 / UTM zone 18S",GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],AUTHORITY["EPSG","4326"]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",-75],PARAMETER["scale_factor",0.9996],PARAMETER["false_easting",500000],PARAMETER["false_northing",10000000],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH],AUTHORITY["EPSG","32718"]]
      grid_mapping_name :
      transverse_mercator
      array(32718, dtype=int32)
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2021-01-03 14:38:55.375489', '2021-01-12 14:32:40.524369',
                     '2021-01-19 14:38:48.054324', '2021-01-28 14:32:36.904937',
                     '2021-02-04 14:38:46.692222', '2021-02-13 14:32:33.777333',
                     '2021-02-20 14:38:42.281574', '2021-03-01 14:32:27.583468',
                     '2021-03-08 14:38:34.597018', '2021-03-17 14:32:18.061595',
                     '2021-03-24 14:38:27.787456', '2021-04-02 14:32:14.760187',
                     '2021-04-09 14:38:23.210719', '2021-04-18 14:32:08.535370',
                     '2021-04-25 14:38:15.566661', '2021-05-04 14:31:58.900929',
                     '2021-05-11 14:38:12.151484', '2021-05-20 14:32:07.248912',
                     '2021-05-27 14:38:21.994864', '2021-06-05 14:32:15.352087',
                     '2021-06-12 14:38:28.853485', '2021-06-21 14:32:20.619610',
                     '2021-06-28 14:38:32.863750', '2021-07-07 14:32:23.063352',
                     '2021-07-14 14:38:34.074785', '2021-07-23 14:32:27.891799',
                     '2021-07-30 14:38:42.254931', '2021-08-08 14:32:35.246629',
                     '2021-08-24 14:32:39.983730', '2021-08-31 14:38:53.021870',
                     '2021-09-09 14:32:44.752377', '2021-09-16 14:38:56.791019',
                     '2021-09-25 14:32:47.793353', '2021-10-02 14:39:01.556057',
                     '2021-10-11 14:32:53.372215', '2021-10-18 14:39:05.271883',
                     '2021-10-27 14:32:54.682166', '2021-11-03 14:39:04.612494',
                     '2021-11-12 14:32:50.796253', '2021-11-19 14:38:59.524053',
                     '2021-11-28 14:32:50.278052', '2021-12-05 14:39:01.305904',
                     '2021-12-14 14:32:49.484309', '2021-12-21 14:38:58.588750',
                     '2021-12-30 14:32:43.480487'],
                    dtype='datetime64[ns]', name='time', freq=None))
    • y
      PandasIndex
      PandasIndex(Float64Index([6799995.0, 6799965.0, 6799935.0, 6799905.0, 6799875.0, 6799845.0,
                    6799815.0, 6799785.0, 6799755.0, 6799725.0,
                    ...
                    6572085.0, 6572055.0, 6572025.0, 6571995.0, 6571965.0, 6571935.0,
                    6571905.0, 6571875.0, 6571845.0, 6571815.0],
                   dtype='float64', name='y', length=7607))
    • x
      PandasIndex
      PandasIndex(Float64Index([762855.0, 762885.0, 762915.0, 762945.0, 762975.0, 763005.0,
                    763035.0, 763065.0, 763095.0, 763125.0,
                    ...
                    963105.0, 963135.0, 963165.0, 963195.0, 963225.0, 963255.0,
                    963285.0, 963315.0, 963345.0, 963375.0],
                   dtype='float64', name='x', length=6685))
  • units :
    reflectance
    nodata :
    0
    crs :
    epsg:32718
    grid_mapping :
    spatial_ref

We won't worry to much about tuning these parameters right now and instead will focus on processing this larger dataset. As before we can exploit dask's ability to use delayed tasks and apply our masking and NDVI directly to the full dataset. We'll also add an unweighted seasonal mean calculation using groupby("time.season").mean("time"). Dask will seek to complete the reductions (by chunk) first as they reduce memory usage.

It's probably worth monitoring the dask cluster memory usage via the dashboard Workers Memory to see just how little ram is actually used during this calculation despite it being performed on a large dataset.

In [15]:
print(dashboard_address)
https://hub.datacubechile.cl/user/jhodge/proxy/34023/status

We will now calculate NDVI and group the results by season:

In [16]:
# Identify pixels that don't have cloud, cloud shadow or water
from datacube.utils import masking

good_pixel_flags = {
    'nodata': False,
    'cloud': 'not_high_confidence',
    'cloud_shadow': 'not_high_confidence',
    'water': 'land_or_cloud'
}

cloud_free_mask = masking.make_mask(dataset['qa_pixel'], **good_pixel_flags)

# Apply the mask
cloud_free = dataset.where(cloud_free_mask)

# Calculate the components that make up the NDVI calculation
band_diff = cloud_free.nir08 - cloud_free.red
band_sum = cloud_free.nir08 + cloud_free.red
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum

ndvi_unweighted = ndvi.groupby("time.season").mean("time")  # Calculate the seasonal mean

Let's check the shape of our result - it should have 4 seasons now instead of the individual dates.

In [17]:
ndvi_unweighted
Out[17]:
<xarray.DataArray (season: 4, y: 7607, x: 6685)>
dask.array<stack, shape=(4, 7607, 6685), dtype=float64, chunksize=(1, 2048, 2048), chunktype=numpy.ndarray>
Coordinates:
  * y            (y) float64 6.8e+06 6.8e+06 6.8e+06 ... 6.572e+06 6.572e+06
  * x            (x) float64 7.629e+05 7.629e+05 ... 9.633e+05 9.634e+05
    spatial_ref  int32 32718
  * season       (season) object 'DJF' 'JJA' 'MAM' 'SON'
xarray.DataArray
  • season: 4
  • y: 7607
  • x: 6685
  • dask.array<chunksize=(1, 2048, 2048), meta=np.ndarray>
    Array Chunk
    Bytes 1.52 GiB 32.00 MiB
    Shape (4, 7607, 6685) (1, 2048, 2048)
    Dask graph 64 chunks in 29 graph layers
    Data type float64 numpy.ndarray
    6685 7607 4
    • y
      (y)
      float64
      6.8e+06 6.8e+06 ... 6.572e+06
      units :
      metre
      resolution :
      -30.0
      crs :
      epsg:32718
      array([6799995., 6799965., 6799935., ..., 6571875., 6571845., 6571815.])
    • x
      (x)
      float64
      7.629e+05 7.629e+05 ... 9.634e+05
      units :
      metre
      resolution :
      30.0
      crs :
      epsg:32718
      array([762855., 762885., 762915., ..., 963315., 963345., 963375.])
    • spatial_ref
      ()
      int32
      32718
      spatial_ref :
      PROJCS["WGS 84 / UTM zone 18S",GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],AUTHORITY["EPSG","4326"]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",-75],PARAMETER["scale_factor",0.9996],PARAMETER["false_easting",500000],PARAMETER["false_northing",10000000],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH],AUTHORITY["EPSG","32718"]]
      grid_mapping_name :
      transverse_mercator
      array(32718, dtype=int32)
    • season
      (season)
      object
      'DJF' 'JJA' 'MAM' 'SON'
      array(['DJF', 'JJA', 'MAM', 'SON'], dtype=object)
    • y
      PandasIndex
      PandasIndex(Float64Index([6799995.0, 6799965.0, 6799935.0, 6799905.0, 6799875.0, 6799845.0,
                    6799815.0, 6799785.0, 6799755.0, 6799725.0,
                    ...
                    6572085.0, 6572055.0, 6572025.0, 6571995.0, 6571965.0, 6571935.0,
                    6571905.0, 6571875.0, 6571845.0, 6571815.0],
                   dtype='float64', name='y', length=7607))
    • x
      PandasIndex
      PandasIndex(Float64Index([762855.0, 762885.0, 762915.0, 762945.0, 762975.0, 763005.0,
                    763035.0, 763065.0, 763095.0, 763125.0,
                    ...
                    963105.0, 963135.0, 963165.0, 963195.0, 963225.0, 963255.0,
                    963285.0, 963315.0, 963345.0, 963375.0],
                   dtype='float64', name='x', length=6685))
    • season
      PandasIndex
      PandasIndex(Index(['DJF', 'JJA', 'MAM', 'SON'], dtype='object', name='season'))

Before we do the compute() to get our result we should make sure the final result will fit in memory for the Jupyter kernel

In [18]:
print(f"dataset size (GiB) {ndvi_unweighted.nbytes / 2**30:.2f}")
dataset size (GiB) 1.52

This shows that the resulting data should be around 1 GiB of data, which will fit in local memory.

If you are monitoring the cluster when you run the cell below, you might notice a delay between running the next cell and actual computation occuring. Dask performs a task graph optimisation step on the client not the cluster. How long this takes depends on the number of tasks and complexity of the graph. The speed of this step has improved recently due to recent Dask updates. We'll talk more about this later.

In the meantime, run the next cell and watch dask compute the result without running out of memory. You might notice that your cluster spills some data to disk (the grey part of the bars in the Bytes stored per worker graph). This is not normally desirable and slows down the calculation (because reading and writing to/from the disk is slower than to/from RAM), but it is a mechanism used by Dask to help manage large calculations.

Tip: don't forget to look at your Dask Dashboard (URL a few cells above) to watch what is happening in your cluster

In [19]:
actual_result = ndvi_unweighted.compute()
/env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned.
  _reproject(
/env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned.
  _reproject(

To avoid northern/southern hemisphere differences, the season values are represented as acronyms of the months that make them up, so:

  • December, January, February = DJF
  • March, April, May = MAM
  • June, July, August = JJA
  • September, October, November = SON

Let's plot the result for DJF. This will take a few seconds, the image is several thousand pixels across.

In [20]:
actual_result.sel(season='DJF').plot(robust=True, size=6, aspect='equal')
Out[20]:
<matplotlib.collections.QuadMesh at 0x7fe725bf8c40>

Not the most useful visualisation as a small image, and a little slow. Dask can help with this too but that's a topic for another notebook. There are many other ways to work with Dask and optimize performance. This is just the beginning of how to manage large calculations.

Be a good dask user - Clean up the cluster resources¶

In [21]:
client.close()

cluster.close()
In [ ]: